/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#define __STDC_FORMAT_MACROS

#include <thrift/thrift-config.h>

#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/transport/PlatformSocket.h>

#include <iostream>

#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif

#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif

#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif

#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif

#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif

#include <assert.h>

#ifdef HAVE_SCHED_H
#include <sched.h>
#endif

#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
#endif

#if !defined(PRIu32)
#define PRIu32 "I32u"
#define PRIu64 "I64u"
#endif

#if defined(_WIN32) && (_WIN32_WINNT < 0x0600)
  #define AI_ADDRCONFIG 0x0400
#endif

namespace apache {
namespace thrift {
namespace server {

using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::concurrency;
using namespace std;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransportException;
using boost::shared_ptr;

/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };

/**
 * Five states for the nonblocking server:
 *  1) initialize
 *  2) read 4 byte frame size
 *  3) read frame of data
 *  4) send back data (if any)
 *  5) force immediate connection close
 */
enum TAppState {
  APP_INIT,
  APP_READ_FRAME_SIZE,
  APP_READ_REQUEST,
  APP_WAIT_TASK,
  APP_SEND_RESULT,
  APP_CLOSE_CONNECTION
};

/**
 * Represents a connection that is handled via libevent. This connection
 * essentially encapsulates a socket that has some associated libevent state.
 */
class TNonblockingServer::TConnection {
private:
  /// Server IO Thread handling this connection
  TNonblockingIOThread* ioThread_;

  /// Server handle
  TNonblockingServer* server_;

  /// TProcessor
  boost::shared_ptr<TProcessor> processor_;

  /// Object wrapping network socket
  boost::shared_ptr<TSocket> tSocket_;

  /// Libevent object
  struct event event_;

  /// Libevent flags
  short eventFlags_;

  /// Socket mode
  TSocketState socketState_;

  /// Application state
  TAppState appState_;

  /// How much data needed to read
  uint32_t readWant_;

  /// Where in the read buffer are we
  uint32_t readBufferPos_;

  /// Read buffer
  uint8_t* readBuffer_;

  /// Read buffer size
  uint32_t readBufferSize_;

  /// Write buffer
  uint8_t* writeBuffer_;

  /// Write buffer size
  uint32_t writeBufferSize_;

  /// How far through writing are we?
  uint32_t writeBufferPos_;

  /// Largest size of write buffer seen since buffer was constructed
  size_t largestWriteBufferSize_;

  /// Count of the number of calls for use with getResizeBufferEveryN().
  int32_t callsForResize_;

  /// Transport to read from
  boost::shared_ptr<TMemoryBuffer> inputTransport_;

  /// Transport that processor writes to
  boost::shared_ptr<TMemoryBuffer> outputTransport_;

  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
  boost::shared_ptr<TTransport> factoryInputTransport_;
  boost::shared_ptr<TTransport> factoryOutputTransport_;

  /// Protocol decoder
  boost::shared_ptr<TProtocol> inputProtocol_;

  /// Protocol encoder
  boost::shared_ptr<TProtocol> outputProtocol_;

  /// Server event handler, if any
  boost::shared_ptr<TServerEventHandler> serverEventHandler_;

  /// Thrift call context, if any
  void* connectionContext_;

  /// Go into read mode
  void setRead() { setFlags(EV_READ | EV_PERSIST); }

  /// Go into write mode
  void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }

  /// Set socket idle
  void setIdle() { setFlags(0); }

  /**
   * Set event flags for this connection.
   *
   * @param eventFlags flags we pass to libevent for the connection.
   */
  void setFlags(short eventFlags);

  /**
   * Libevent handler called (via our static wrapper) when the connection
   * socket had something happen.  Rather than use the flags libevent passed,
   * we use the connection state to determine whether we need to read or
   * write the socket.
   */
  void workSocket();

public:
  class Task;

  /// Constructor
  TConnection(THRIFT_SOCKET socket,
              TNonblockingIOThread* ioThread,
              const sockaddr* addr,
              socklen_t addrLen) {
    readBuffer_ = NULL;
    readBufferSize_ = 0;

    ioThread_ = ioThread;
    server_ = ioThread->getServer();

    // Allocate input and output transports these only need to be allocated
    // once per TConnection (they don't need to be reallocated on init() call)
    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
    outputTransport_.reset(
        new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
    tSocket_.reset(new TSocket());
    init(socket, ioThread, addr, addrLen);
  }

  ~TConnection() { std::free(readBuffer_); }

  /// Close this connection and free or reset its resources.
  void close();

  /**
    * Check buffers against any size limits and shrink it if exceeded.
    *
    * @param readLimit we reduce read buffer size to this (if nonzero).
    * @param writeLimit if nonzero and write buffer is larger, replace it.
    */
  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);

  /// Initialize
  void init(THRIFT_SOCKET socket,
            TNonblockingIOThread* ioThread,
            const sockaddr* addr,
            socklen_t addrLen);

  /**
   * This is called when the application transitions from one state into
   * another. This means that it has finished writing the data that it needed
   * to, or finished receiving the data that it needed to.
   */
  void transition();

  /**
   * C-callable event handler for connection events.  Provides a callback
   * that libevent can understand which invokes connection_->workSocket().
   *
   * @param fd the descriptor the event occurred on.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TConnection's "this".
   */
  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
    assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
    ((TConnection*)v)->workSocket();
  }

  /**
   * Notification to server that processing has ended on this request.
   * Can be called either when processing is completed or when a waiting
   * task has been preemptively terminated (on overload).
   *
   * Don't call this from the IO thread itself.
   *
   * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
   */
  bool notifyIOThread() { return ioThread_->notify(this); }

  /*
   * Returns the number of this connection's currently assigned IO
   * thread.
   */
  int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }

  /// Force connection shutdown for this connection.
  void forceClose() {
    appState_ = APP_CLOSE_CONNECTION;
    if (!notifyIOThread()) {
      close();
      throw TException("TConnection::forceClose: failed write on notify pipe");
    }
  }

  /// return the server this connection was initialized for.
  TNonblockingServer* getServer() const { return server_; }

  /// get state of connection.
  TAppState getState() const { return appState_; }

  /// return the TSocket transport wrapping this network connection
  boost::shared_ptr<TSocket> getTSocket() const { return tSocket_; }

  /// return the server event handler if any
  boost::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }

  /// return the Thrift connection context if any
  void* getConnectionContext() { return connectionContext_; }
};

class TNonblockingServer::TConnection::Task : public Runnable {
public:
  Task(boost::shared_ptr<TProcessor> processor,
       boost::shared_ptr<TProtocol> input,
       boost::shared_ptr<TProtocol> output,
       TConnection* connection)
    : processor_(processor),
      input_(input),
      output_(output),
      connection_(connection),
      serverEventHandler_(connection_->getServerEventHandler()),
      connectionContext_(connection_->getConnectionContext()) {}

  void run() {
    try {
      for (;;) {
        if (serverEventHandler_) {
          serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
        }
        if (!processor_->process(input_, output_, connectionContext_)
            || !input_->getTransport()->peek()) {
          break;
        }
      }
    } catch (const TTransportException& ttx) {
      GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
    } catch (const bad_alloc&) {
      GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
      exit(1);
    } catch (const std::exception& x) {
      GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
                          typeid(x).name(),
                          x.what());
    } catch (...) {
      GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
    }

    // Signal completion back to the libevent thread via a pipe
    if (!connection_->notifyIOThread()) {
      GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
      connection_->close();
      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
    }
  }

  TConnection* getTConnection() { return connection_; }

private:
  boost::shared_ptr<TProcessor> processor_;
  boost::shared_ptr<TProtocol> input_;
  boost::shared_ptr<TProtocol> output_;
  TConnection* connection_;
  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
  void* connectionContext_;
};

void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
                                           TNonblockingIOThread* ioThread,
                                           const sockaddr* addr,
                                           socklen_t addrLen) {
  tSocket_->setSocketFD(socket);
  tSocket_->setCachedAddress(addr, addrLen);

  ioThread_ = ioThread;
  server_ = ioThread->getServer();
  appState_ = APP_INIT;
  eventFlags_ = 0;

  readBufferPos_ = 0;
  readWant_ = 0;

  writeBuffer_ = NULL;
  writeBufferSize_ = 0;
  writeBufferPos_ = 0;
  largestWriteBufferSize_ = 0;

  socketState_ = SOCKET_RECV_FRAMING;
  callsForResize_ = 0;

  // get input/transports
  factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
  factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);

  // Create protocol
  if (server_->getHeaderTransport()) {
    inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
                                                                     factoryOutputTransport_);
    outputProtocol_ = inputProtocol_;
  } else {
    inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
    outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
  }

  // Set up for any server event handler
  serverEventHandler_ = server_->getEventHandler();
  if (serverEventHandler_) {
    connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
  } else {
    connectionContext_ = NULL;
  }

  // Get the processor
  processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}

void TNonblockingServer::TConnection::workSocket() {
  int got = 0, left = 0, sent = 0;
  uint32_t fetch = 0;

  switch (socketState_) {
  case SOCKET_RECV_FRAMING:
    union {
      uint8_t buf[sizeof(uint32_t)];
      uint32_t size;
    } framing;

    // if we've already received some bytes we kept them here
    framing.size = readWant_;
    // determine size of this frame
    try {
      // Read from the socket
      fetch = tSocket_->read(&framing.buf[readBufferPos_],
                             uint32_t(sizeof(framing.size) - readBufferPos_));
      if (fetch == 0) {
        // Whenever we get here it means a remote disconnect
        close();
        return;
      }
      readBufferPos_ += fetch;
    } catch (TTransportException& te) {
      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
      close();

      return;
    }

    if (readBufferPos_ < sizeof(framing.size)) {
      // more needed before frame size is known -- save what we have so far
      readWant_ = framing.size;
      return;
    }

    readWant_ = ntohl(framing.size);
    if (readWant_ > server_->getMaxFrameSize()) {
      // Don't allow giant frame sizes.  This prevents bad clients from
      // causing us to try and allocate a giant buffer.
      GlobalOutput.printf(
          "TNonblockingServer: frame size too large "
          "(%" PRIu32 " > %" PRIu64
          ") from client %s. "
          "Remote side not using TFramedTransport?",
          readWant_,
          (uint64_t)server_->getMaxFrameSize(),
          tSocket_->getSocketInfo().c_str());
      close();
      return;
    }
    // size known; now get the rest of the frame
    transition();
    return;

  case SOCKET_RECV:
    // It is an error to be in this state if we already have all the data
    assert(readBufferPos_ < readWant_);

    try {
      // Read from the socket
      fetch = readWant_ - readBufferPos_;
      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
    } catch (TTransportException& te) {
      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
      close();

      return;
    }

    if (got > 0) {
      // Move along in the buffer
      readBufferPos_ += got;

      // Check that we did not overdo it
      assert(readBufferPos_ <= readWant_);

      // We are done reading, move onto the next state
      if (readBufferPos_ == readWant_) {
        transition();
      }
      return;
    }

    // Whenever we get down here it means a remote disconnect
    close();

    return;

  case SOCKET_SEND:
    // Should never have position past size
    assert(writeBufferPos_ <= writeBufferSize_);

    // If there is no data to send, then let us move on
    if (writeBufferPos_ == writeBufferSize_) {
      GlobalOutput("WARNING: Send state with no data to send\n");
      transition();
      return;
    }

    try {
      left = writeBufferSize_ - writeBufferPos_;
      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
    } catch (TTransportException& te) {
      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
      close();
      return;
    }

    writeBufferPos_ += sent;

    // Did we overdo it?
    assert(writeBufferPos_ <= writeBufferSize_);

    // We are done!
    if (writeBufferPos_ == writeBufferSize_) {
      transition();
    }

    return;

  default:
    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
    assert(0);
  }
}

bool TNonblockingServer::getHeaderTransport() {
  // Currently if there is no output protocol factory,
  // we assume header transport (without having to create
  // a new transport and check)
  return getOutputProtocolFactory() == NULL;
}

/**
 * This is called when the application transitions from one state into
 * another. This means that it has finished writing the data that it needed
 * to, or finished receiving the data that it needed to.
 */
void TNonblockingServer::TConnection::transition() {
  // ensure this connection is active right now
  assert(ioThread_);
  assert(server_);

  // Switch upon the state that we are currently in and move to a new state
  switch (appState_) {

  case APP_READ_REQUEST:
    // We are done reading the request, package the read buffer into transport
    // and get back some data from the dispatch function
    if (server_->getHeaderTransport()) {
      inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
      outputTransport_->resetBuffer();
    } else {
      // We saved room for the framing size in case header transport needed it,
      // but just skip it for the non-header case
      inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
      outputTransport_->resetBuffer();

      // Prepend four bytes of blank space to the buffer so we can
      // write the frame size there later.
      outputTransport_->getWritePtr(4);
      outputTransport_->wroteBytes(4);
    }

    server_->incrementActiveProcessors();

    if (server_->isThreadPoolProcessing()) {
      // We are setting up a Task to do this work and we will wait on it

      // Create task and dispatch to the thread manager
      boost::shared_ptr<Runnable> task = boost::shared_ptr<Runnable>(
          new Task(processor_, inputProtocol_, outputProtocol_, this));
      // The application is now waiting on the task to finish
      appState_ = APP_WAIT_TASK;

      try {
        server_->addTask(task);
      } catch (IllegalStateException& ise) {
        // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
        GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
        close();
      } catch (TimedOutException& to) {
        GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
        close();
      }

      // Set this connection idle so that libevent doesn't process more
      // data on it while we're still waiting for the threadmanager to
      // finish this task
      setIdle();
      return;
    } else {
      try {
        if (serverEventHandler_) {
          serverEventHandler_->processContext(connectionContext_, getTSocket());
        }
        // Invoke the processor
        processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
      } catch (const TTransportException& ttx) {
        GlobalOutput.printf(
            "TNonblockingServer transport error in "
            "process(): %s",
            ttx.what());
        server_->decrementActiveProcessors();
        close();
        return;
      } catch (const std::exception& x) {
        GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
                            typeid(x).name(),
                            x.what());
        server_->decrementActiveProcessors();
        close();
        return;
      } catch (...) {
        GlobalOutput.printf("Server::process() unknown exception");
        server_->decrementActiveProcessors();
        close();
        return;
      }
    }

  // Intentionally fall through here, the call to process has written into
  // the writeBuffer_

  case APP_WAIT_TASK:
    // We have now finished processing a task and the result has been written
    // into the outputTransport_, so we grab its contents and place them into
    // the writeBuffer_ for actual writing by the libevent thread

    server_->decrementActiveProcessors();
    // Get the result of the operation
    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);

    // If the function call generated return data, then move into the send
    // state and get going
    // 4 bytes were reserved for frame size
    if (writeBufferSize_ > 4) {

      // Move into write state
      writeBufferPos_ = 0;
      socketState_ = SOCKET_SEND;

      // Put the frame size into the write buffer
      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
      memcpy(writeBuffer_, &frameSize, 4);

      // Socket into write mode
      appState_ = APP_SEND_RESULT;
      setWrite();

      // Try to work the socket immediately
      // workSocket();

      return;
    }

    // In this case, the request was oneway and we should fall through
    // right back into the read frame header state
    goto LABEL_APP_INIT;

  case APP_SEND_RESULT:
    // it's now safe to perform buffer size housekeeping.
    if (writeBufferSize_ > largestWriteBufferSize_) {
      largestWriteBufferSize_ = writeBufferSize_;
    }
    if (server_->getResizeBufferEveryN() > 0
        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
                              server_->getIdleWriteBufferLimit());
      callsForResize_ = 0;
    }

  // N.B.: We also intentionally fall through here into the INIT state!

  LABEL_APP_INIT:
  case APP_INIT:

    // Clear write buffer variables
    writeBuffer_ = NULL;
    writeBufferPos_ = 0;
    writeBufferSize_ = 0;

    // Into read4 state we go
    socketState_ = SOCKET_RECV_FRAMING;
    appState_ = APP_READ_FRAME_SIZE;

    readBufferPos_ = 0;

    // Register read event
    setRead();

    // Try to work the socket right away
    // workSocket();

    return;

  case APP_READ_FRAME_SIZE:
    readWant_ += 4;

    // We just read the request length
    // Double the buffer size until it is big enough
    if (readWant_ > readBufferSize_) {
      if (readBufferSize_ == 0) {
        readBufferSize_ = 1;
      }
      uint32_t newSize = readBufferSize_;
      while (readWant_ > newSize) {
        newSize *= 2;
      }

      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
      if (newBuffer == NULL) {
        // nothing else to be done...
        throw std::bad_alloc();
      }
      readBuffer_ = newBuffer;
      readBufferSize_ = newSize;
    }

    readBufferPos_ = 4;
    *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);

    // Move into read request state
    socketState_ = SOCKET_RECV;
    appState_ = APP_READ_REQUEST;

    // Work the socket right away
    // workSocket();

    return;

  case APP_CLOSE_CONNECTION:
    server_->decrementActiveProcessors();
    close();
    return;

  default:
    GlobalOutput.printf("Unexpected Application State %d", appState_);
    assert(0);
  }
}

void TNonblockingServer::TConnection::setFlags(short eventFlags) {
  // Catch the do nothing case
  if (eventFlags_ == eventFlags) {
    return;
  }

  // Delete a previously existing event
  if (eventFlags_ != 0) {
    if (event_del(&event_) == -1) {
      GlobalOutput("TConnection::setFlags event_del");
      return;
    }
  }

  // Update in memory structure
  eventFlags_ = eventFlags;

  // Do not call event_set if there are no flags
  if (!eventFlags_) {
    return;
  }

  /*
   * event_set:
   *
   * Prepares the event structure &event to be used in future calls to
   * event_add() and event_del().  The event will be prepared to call the
   * eventHandler using the 'sock' file descriptor to monitor events.
   *
   * The events can be either EV_READ, EV_WRITE, or both, indicating
   * that an application can read or write from the file respectively without
   * blocking.
   *
   * The eventHandler will be called with the file descriptor that triggered
   * the event and the type of event which will be one of: EV_TIMEOUT,
   * EV_SIGNAL, EV_READ, EV_WRITE.
   *
   * The additional flag EV_PERSIST makes an event_add() persistent until
   * event_del() has been called.
   *
   * Once initialized, the &event struct can be used repeatedly with
   * event_add() and event_del() and does not need to be reinitialized unless
   * the eventHandler and/or the argument to it are to be changed.  However,
   * when an ev structure has been added to libevent using event_add() the
   * structure must persist until the event occurs (assuming EV_PERSIST
   * is not set) or is removed using event_del().  You may not reuse the same
   * ev structure for multiple monitored descriptors; each descriptor needs
   * its own ev.
   */
  event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
  event_base_set(ioThread_->getEventBase(), &event_);

  // Add the event
  if (event_add(&event_, 0) == -1) {
    GlobalOutput("TConnection::setFlags(): could not event_add");
  }
}

/**
 * Closes a connection
 */
void TNonblockingServer::TConnection::close() {
  // Delete the registered libevent
  if (event_del(&event_) == -1) {
    GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
  }

  if (serverEventHandler_) {
    serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
  }
  ioThread_ = NULL;

  // Close the socket
  tSocket_->close();

  // close any factory produced transports
  factoryInputTransport_->close();
  factoryOutputTransport_->close();

  // release processor and handler
  processor_.reset();

  // Give this object back to the server that owns it
  server_->returnConnection(this);
}

void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
  if (readLimit > 0 && readBufferSize_ > readLimit) {
    free(readBuffer_);
    readBuffer_ = NULL;
    readBufferSize_ = 0;
  }

  if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
    // just start over
    outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
    largestWriteBufferSize_ = 0;
  }
}

TNonblockingServer::~TNonblockingServer() {
  // Close any active connections (moves them to the idle connection stack)
  while (activeConnections_.size()) {
    activeConnections_.front()->close();
  }
  // Clean up unused TConnection objects in connectionStack_
  while (!connectionStack_.empty()) {
    TConnection* connection = connectionStack_.top();
    connectionStack_.pop();
    delete connection;
  }
  // The TNonblockingIOThread objects have shared_ptrs to the Thread
  // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
  // objects (as runnable) so these objects will never deallocate without help.
  while (!ioThreads_.empty()) {
    boost::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
    ioThreads_.pop_back();
    iot->setThread(boost::shared_ptr<Thread>());
  }
}

/**
 * Creates a new connection either by reusing an object off the stack or
 * by allocating a new one entirely
 */
TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
                                                                      const sockaddr* addr,
                                                                      socklen_t addrLen) {
  // Check the stack
  Guard g(connMutex_);

  // pick an IO thread to handle this connection -- currently round robin
  assert(nextIOThread_ < ioThreads_.size());
  int selectedThreadIdx = nextIOThread_;
  nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());

  TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();

  // Check the connection stack to see if we can re-use
  TConnection* result = NULL;
  if (connectionStack_.empty()) {
    result = new TConnection(socket, ioThread, addr, addrLen);
    ++numTConnections_;
  } else {
    result = connectionStack_.top();
    connectionStack_.pop();
    result->init(socket, ioThread, addr, addrLen);
  }
  activeConnections_.push_back(result);
  return result;
}

/**
 * Returns a connection to the stack
 */
void TNonblockingServer::returnConnection(TConnection* connection) {
  Guard g(connMutex_);

  activeConnections_.erase(std::remove(activeConnections_.begin(),
                                       activeConnections_.end(),
                                       connection),
                           activeConnections_.end());

  if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
    delete connection;
    --numTConnections_;
  } else {
    connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
    connectionStack_.push(connection);
  }
}

/**
 * Server socket had something happen.  We accept all waiting client
 * connections on fd and assign TConnection objects to handle those requests.
 */
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
  (void)which;
  // Make sure that libevent didn't mess up the socket handles
  assert(fd == serverSocket_);

  // Server socket accepted a new connection
  socklen_t addrLen;
  sockaddr_storage addrStorage;
  sockaddr* addrp = (sockaddr*)&addrStorage;
  addrLen = sizeof(addrStorage);

  // Going to accept a new client socket
  THRIFT_SOCKET clientSocket;

  // Accept as many new clients as possible, even though libevent signaled only
  // one, this helps us to avoid having to go back into the libevent engine so
  // many times
  while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
    // If we're overloaded, take action here
    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
      Guard g(connMutex_);
      nConnectionsDropped_++;
      nTotalConnectionsDropped_++;
      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
        ::THRIFT_CLOSESOCKET(clientSocket);
        return;
      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
        if (!drainPendingTask()) {
          // Nothing left to discard, so we drop connection instead.
          ::THRIFT_CLOSESOCKET(clientSocket);
          return;
        }
      }
    }

    // Explicitly set this socket to NONBLOCK mode
    int flags;
    if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
        || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
      GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
                          THRIFT_GET_SOCKET_ERROR);
      ::THRIFT_CLOSESOCKET(clientSocket);
      return;
    }

    // Create a new TConnection for this client socket.
    TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);

    // Fail fast if we could not create a TConnection object
    if (clientConnection == NULL) {
      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
      ::THRIFT_CLOSESOCKET(clientSocket);
      return;
    }

    /*
     * Either notify the ioThread that is assigned this connection to
     * start processing, or if it is us, we'll just ask this
     * connection to do its initial state change here.
     *
     * (We need to avoid writing to our own notification pipe, to
     * avoid possible deadlocks if the pipe is full.)
     *
     * The IO thread #0 is the only one that handles these listen
     * events, so unless the connection has been assigned to thread #0
     * we know it's not on our thread.
     */
    if (clientConnection->getIOThreadNumber() == 0) {
      clientConnection->transition();
    } else {
      if (!clientConnection->notifyIOThread()) {
        GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
        returnConnection(clientConnection);
      }
    }

    // addrLen is written by the accept() call, so needs to be set before the next call.
    addrLen = sizeof(addrStorage);
  }

  // Done looping accept, now we have to make sure the error is due to
  // blocking. Any other error is a problem
  if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
    GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
  }
}

/**
 * Creates a socket to listen on and binds it to the local port.
 */
void TNonblockingServer::createAndListenOnSocket() {
#ifdef _WIN32
  TWinsockSingleton::create();
#endif // _WIN32

  THRIFT_SOCKET s;

  struct addrinfo hints, *res, *res0;
  int error;

  char port[sizeof("65536") + 1];
  memset(&hints, 0, sizeof(hints));
  hints.ai_family = PF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
  sprintf(port, "%d", port_);

  // Wildcard address
  error = getaddrinfo(NULL, port, &hints, &res0);
  if (error) {
    throw TException("TNonblockingServer::serve() getaddrinfo "
                     + string(THRIFT_GAI_STRERROR(error)));
  }

  // Pick the ipv6 address first since ipv4 addresses can be mapped
  // into ipv6 space.
  for (res = res0; res; res = res->ai_next) {
    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
      break;
  }

  // Create the server socket
  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  if (s == -1) {
    freeaddrinfo(res0);
    throw TException("TNonblockingServer::serve() socket() -1");
  }

#ifdef IPV6_V6ONLY
  if (res->ai_family == AF_INET6) {
    int zero = 0;
    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
      GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
    }
  }
#endif // #ifdef IPV6_V6ONLY

  int one = 1;

  // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
  setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));

  if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
    ::THRIFT_CLOSESOCKET(s);
    freeaddrinfo(res0);
    throw TTransportException(TTransportException::NOT_OPEN,
                              "TNonblockingServer::serve() bind",
                              THRIFT_GET_SOCKET_ERROR);
  }

  // Done with the addr info
  freeaddrinfo(res0);

  // Set up this file descriptor for listening
  listenSocket(s);
}

/**
 * Takes a socket created by listenSocket() and sets various options on it
 * to prepare for use in the server.
 */
void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
  // Set socket to nonblocking mode
  int flags;
  if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
      || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
    ::THRIFT_CLOSESOCKET(s);
    throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
  }

  int one = 1;
  struct linger ling = {0, 0};

  // Keepalive to ensure full result flushing
  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));

  // Turn linger off to avoid hung sockets
  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));

// Set TCP nodelay if available, MAC OS X Hack
// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
#ifndef TCP_NOPUSH
  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
#endif

#ifdef TCP_LOW_MIN_RTO
  if (TSocket::getUseLowMinRto()) {
    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
  }
#endif

  if (listen(s, LISTEN_BACKLOG) == -1) {
    ::THRIFT_CLOSESOCKET(s);
    throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServer::serve() listen");
  }

  // Cool, this socket is good to go, set it as the serverSocket_
  serverSocket_ = s;

  if (!port_) {
    struct sockaddr_storage addr;
    socklen_t size = sizeof(addr);
    if (!getsockname(serverSocket_, reinterpret_cast<sockaddr*>(&addr), &size)) {
      if (addr.ss_family == AF_INET6) {
        const struct sockaddr_in6* sin = reinterpret_cast<const struct sockaddr_in6*>(&addr);
        listenPort_ = ntohs(sin->sin6_port);
      } else {
        const struct sockaddr_in* sin = reinterpret_cast<const struct sockaddr_in*>(&addr);
        listenPort_ = ntohs(sin->sin_port);
      }
    } else {
      GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR);
    }
  }
}

void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
  threadManager_ = threadManager;
  if (threadManager) {
    threadManager->setExpireCallback(
        apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose,
                                     this,
                                     apache::thrift::stdcxx::placeholders::_1));
    threadPoolProcessing_ = true;
  } else {
    threadPoolProcessing_ = false;
  }
}

bool TNonblockingServer::serverOverloaded() {
  size_t activeConnections = numTConnections_ - connectionStack_.size();
  if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
    if (!overloaded_) {
      GlobalOutput.printf("TNonblockingServer: overload condition begun.");
      overloaded_ = true;
    }
  } else {
    if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
        && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
      GlobalOutput.printf(
          "TNonblockingServer: overload ended; "
          "%u dropped (%llu total)",
          nConnectionsDropped_,
          nTotalConnectionsDropped_);
      nConnectionsDropped_ = 0;
      overloaded_ = false;
    }
  }

  return overloaded_;
}

bool TNonblockingServer::drainPendingTask() {
  if (threadManager_) {
    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
    if (task) {
      TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
      assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
      connection->forceClose();
      return true;
    }
  }
  return false;
}

void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
  TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
  assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
  connection->forceClose();
}

void TNonblockingServer::stop() {
  if (!port_) {
    listenPort_ = 0;
  }
  // Breaks the event loop in all threads so that they end ASAP.
  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
    ioThreads_[i]->stop();
  }
}

void TNonblockingServer::registerEvents(event_base* user_event_base) {
  userEventBase_ = user_event_base;

  // init listen socket
  if (serverSocket_ == THRIFT_INVALID_SOCKET)
    createAndListenOnSocket();

  // set up the IO threads
  assert(ioThreads_.empty());
  if (!numIOThreads_) {
    numIOThreads_ = DEFAULT_IO_THREADS;
  }
  // User-provided event-base doesn't works for multi-threaded servers
  assert(numIOThreads_ == 1 || !userEventBase_);

  for (uint32_t id = 0; id < numIOThreads_; ++id) {
    // the first IO thread also does the listening on server socket
    THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);

    shared_ptr<TNonblockingIOThread> thread(
        new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
    ioThreads_.push_back(thread);
  }

  // Notify handler of the preServe event
  if (eventHandler_) {
    eventHandler_->preServe();
  }

  // Start all of our helper IO threads. Note that the threads run forever,
  // only terminating if stop() is called.
  assert(ioThreads_.size() == numIOThreads_);
  assert(ioThreads_.size() > 0);

  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
                      listenPort_,
                      ioThreads_.size());

  // Launch all the secondary IO threads in separate threads
  if (ioThreads_.size() > 1) {
    ioThreadFactory_.reset(new PlatformThreadFactory(
#if !USE_BOOST_THREAD && !USE_STD_THREAD
        PlatformThreadFactory::OTHER,  // scheduler
        PlatformThreadFactory::NORMAL, // priority
        1,                             // stack size (MB)
#endif
        false // detached
        ));

    assert(ioThreadFactory_.get());

    // intentionally starting at thread 1, not 0
    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
      ioThreads_[i]->setThread(thread);
      thread->start();
    }
  }

  // Register the events for the primary (listener) IO thread
  ioThreads_[0]->registerEvents();
}

/**
 * Main workhorse function, starts up the server listening on a port and
 * loops over the libevent handler.
 */
void TNonblockingServer::serve() {

  if (ioThreads_.empty())
    registerEvents(NULL);

  // Run the primary (listener) IO thread loop in our main thread; this will
  // only return when the server is shutting down.
  ioThreads_[0]->run();

  // Ensure all threads are finished before exiting serve()
  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
    ioThreads_[i]->join();
    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
  }
}

TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
                                           int number,
                                           THRIFT_SOCKET listenSocket,
                                           bool useHighPriority)
  : server_(server),
    number_(number),
    listenSocket_(listenSocket),
    useHighPriority_(useHighPriority),
    eventBase_(NULL),
    ownEventBase_(false) {
  notificationPipeFDs_[0] = -1;
  notificationPipeFDs_[1] = -1;
}

TNonblockingIOThread::~TNonblockingIOThread() {
  // make sure our associated thread is fully finished
  join();

  if (eventBase_ && ownEventBase_) {
    event_base_free(eventBase_);
    ownEventBase_ = false;
  }

  if (listenSocket_ >= 0) {
    if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
      GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
    }
    listenSocket_ = THRIFT_INVALID_SOCKET;
  }

  for (int i = 0; i < 2; ++i) {
    if (notificationPipeFDs_[i] >= 0) {
      if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) {
        GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
                            THRIFT_GET_SOCKET_ERROR);
      }
      notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
    }
  }
}

void TNonblockingIOThread::createNotificationPipe() {
  if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
    throw TException("can't create notification pipe");
  }
  if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
      || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
    ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
    throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
  }
  for (int i = 0; i < 2; ++i) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
    int flags;
    if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0
        || THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
    if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
#endif
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
      ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
      throw TException(
          "TNonblockingServer::createNotificationPipe() "
          "FD_CLOEXEC");
    }
  }
}

/**
 * Register the core libevent events onto the proper base.
 */
void TNonblockingIOThread::registerEvents() {
  threadId_ = Thread::get_current();

  assert(eventBase_ == 0);
  eventBase_ = getServer()->getUserEventBase();
  if (eventBase_ == NULL) {
    eventBase_ = event_base_new();
    ownEventBase_ = true;
  }

  // Print some libevent stats
  if (number_ == 0) {
    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
                        event_get_version(),
                        event_base_get_method(eventBase_));
  }

  if (listenSocket_ >= 0) {
    // Register the server event
    event_set(&serverEvent_,
              listenSocket_,
              EV_READ | EV_PERSIST,
              TNonblockingIOThread::listenHandler,
              server_);
    event_base_set(eventBase_, &serverEvent_);

    // Add the event and start up the server
    if (-1 == event_add(&serverEvent_, 0)) {
      throw TException(
          "TNonblockingServer::serve(): "
          "event_add() failed on server listen event");
    }
    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
  }

  createNotificationPipe();

  // Create an event to be notified when a task finishes
  event_set(&notificationEvent_,
            getNotificationRecvFD(),
            EV_READ | EV_PERSIST,
            TNonblockingIOThread::notifyHandler,
            this);

  // Attach to the base
  event_base_set(eventBase_, &notificationEvent_);

  // Add the event and start up the server
  if (-1 == event_add(&notificationEvent_, 0)) {
    throw TException(
        "TNonblockingServer::serve(): "
        "event_add() failed on task-done notification event");
  }
  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}

bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
  THRIFT_SOCKET fd = getNotificationSendFD();
  if (fd < 0) {
    return false;
  }

  fd_set wfds, efds;
  long ret = -1;
  long kSize = sizeof(conn);
  const char* pos = reinterpret_cast<const char*>(&conn);

  while (kSize > 0) {
    FD_ZERO(&wfds);
    FD_ZERO(&efds);
    FD_SET(fd, &wfds);
    FD_SET(fd, &efds);
    ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL);
    if (ret < 0) {
      return false;
    } else if (ret == 0) {
      continue;
    }

    if (FD_ISSET(fd, &efds)) {
      ::THRIFT_CLOSESOCKET(fd);
      return false;
    }

    if (FD_ISSET(fd, &wfds)) {
      ret = send(fd, pos, kSize, 0);
      if (ret < 0) {
        if (errno == EAGAIN) {
          continue;
        }

        ::THRIFT_CLOSESOCKET(fd);
        return false;
      }

      kSize -= ret;
      pos += ret;
    }
  }

  return true;
}

/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
  TNonblockingIOThread* ioThread = (TNonblockingIOThread*)v;
  assert(ioThread);
  (void)which;

  while (true) {
    TNonblockingServer::TConnection* connection = 0;
    const int kSize = sizeof(connection);
    long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
    if (nBytes == kSize) {
      if (connection == NULL) {
        // this is the command to stop our thread, exit the handler!
        return;
      }
      connection->transition();
    } else if (nBytes > 0) {
      // throw away these bytes and hope that next time we get a solid read
      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
      ioThread->breakLoop(true);
      return;
    } else if (nBytes == 0) {
      GlobalOutput.printf("notifyHandler: Notify socket closed!");
      // exit the loop
      break;
    } else { // nBytes < 0
      if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
          && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
        GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
        ioThread->breakLoop(true);
        return;
      }
      // exit the loop
      break;
    }
  }
}

void TNonblockingIOThread::breakLoop(bool error) {
  if (error) {
    GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
    // TODO: figure out something better to do here, but for now kill the
    // whole process.
    GlobalOutput.printf("TNonblockingServer: aborting process.");
    ::abort();
  }

  // sets a flag so that the loop exits on the next event
  event_base_loopbreak(eventBase_);

  // event_base_loopbreak() only causes the loop to exit the next time
  // it wakes up.  We need to force it to wake up, in case there are
  // no real events it needs to process.
  //
  // If we're running in the same thread, we can't use the notify(0)
  // mechanism to stop the thread, but happily if we're running in the
  // same thread, this means the thread can't be blocking in the event
  // loop either.
  if (!Thread::is_current(threadId_)) {
    notify(NULL);
  }
}

void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
  // Start out with a standard, low-priority setup for the sched params.
  struct sched_param sp;
  bzero((void*)&sp, sizeof(sp));
  int policy = SCHED_OTHER;

  // If desired, set up high-priority sched params structure.
  if (value) {
    // FIFO scheduler, ranked above default SCHED_OTHER queue
    policy = SCHED_FIFO;
    // The priority only compares us to other SCHED_FIFO threads, so we
    // just pick a random priority halfway between min & max.
    const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;

    sp.sched_priority = priority;
  }

  // Actually set the sched params for the current thread.
  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
    GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
  } else {
    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
  }
#else
  THRIFT_UNUSED_VARIABLE(value);
#endif
}

void TNonblockingIOThread::run() {
  if (eventBase_ == NULL)
    registerEvents();

  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);

  if (useHighPriority_) {
    setCurrentThreadHighPriority(true);
  }

  // Run libevent engine, never returns, invokes calls to eventHandler
  event_base_loop(eventBase_, 0);

  if (useHighPriority_) {
    setCurrentThreadHighPriority(false);
  }

  // cleans up our registered events
  cleanupEvents();

  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}

void TNonblockingIOThread::cleanupEvents() {
  // stop the listen socket, if any
  if (listenSocket_ >= 0) {
    if (event_del(&serverEvent_) == -1) {
      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
    }
  }

  event_del(&notificationEvent_);
}

void TNonblockingIOThread::stop() {
  // This should cause the thread to fall out of its event loop ASAP.
  breakLoop(false);
}

void TNonblockingIOThread::join() {
  // If this was a thread created by a factory (not the thread that called
  // serve()), we join() it to make sure we shut down fully.
  if (thread_) {
    try {
      // Note that it is safe to both join() ourselves twice, as well as join
      // the current thread as the pthread implementation checks for deadlock.
      thread_->join();
    } catch (...) {
      // swallow everything
    }
  }
}
}
}
} // apache::thrift::server
